package com.dec.kks.etl;

import com.dec.kks.etl.producer.DECConfigUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;

import java.util.*;

public class FFTKafkaMain {

    public static void main(String[] args) throws Exception {
        SparkConf conf = new SparkConf().setMaster("local[10]").setAppName("dec-kks-etl");
        conf.set("spark.streaming.stopGracefullyOnShutdown", "true");
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.set("spark.streaming.kafka.maxRatePerPartition", "1");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
        Map<String, Object> kafkaParams = DECConfigUtil.loadConfig("/home/hdfs/soft/IdeaProjects/dec-kks-etl/src/main/resources", 0);
        Collection<String> topics = Arrays.asList("test-dec-rt-dealed-t00101");

        JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        jssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );

        JavaDStream<String> mps = stream.mapPartitions(new FlatMapFunction<Iterator<ConsumerRecord<String, String>>, String>() {
            @Override
            public Iterator<String> call(Iterator<ConsumerRecord<String, String>> consumerRecordIterator) throws Exception {
                List<String> list = new ArrayList<>();
                while (consumerRecordIterator.hasNext()){
                    ConsumerRecord<String, String> res = consumerRecordIterator.next();
                    System.out.println("分区："+res.partition());
                    list.add(res.value());
                }
                return list.iterator();
            }
        });
        mps.print();

        CanCommitOffsets ids = ((CanCommitOffsets) stream.inputDStream());
        stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
            @Override
            public void call(JavaRDD<ConsumerRecord<String, String>> rdd) throws Exception {
                OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                for (OffsetRange o:offsetRanges){
                    System.out.println(o.partition()+" "+o.fromOffset()+" " +o.untilOffset());
                }
                ids.commitAsync(offsetRanges);
            }
        });

        jssc.start();
        jssc.awaitTermination();
    }
}
